PythonのStrEnumを使ってSNSトピックにPublishした場合、文字列を受け取れるのか試してみた
PythonのStrEnumを使ってSNSトピックにメッセージをPublishした場合の動作を試してみました。
2024.09.30
Python 3.11 から、「StrEnum」が追加されました。
StrEnumを使うことで、文字列同士を比較するよりも可読性やメンテナンス性が向上しますし、単なるEnumよりもわかりやすくなります(文字列を扱う場合)。
そして、ふと気になりました。このStrEnumを利用してJSONを作成したり、SNSトピックにPublishしたとき、受け取った側ではどうなっているんだろう?と。
試してみました。
おすすめの方
- StrEnumの雰囲気を知りたい方
- StrEnumを利用してJSONを作成したい方
- StrEnumを利用したJSONをSNSトピックにPublishして、受け取った場合にどうなるか知りたい方
LambdaとSNSをデプロイする
sam init
sam init \
--runtime python3.11 \
--name lambda-sns-strenum-sample \
--app-template hello-world \
--no-tracing \
--no-application-insights \
--structured-logging \
--package-type Zip
SAMテンプレート
1つのSNSトピックと2つのLambdaを作成します。Lambdaは、SNSトピックをPublishするLambdaとSubscribeするLambdaを用意します。
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: lambda-sns-strenum-sample
Globals:
Function:
Timeout: 3
LoggingConfig:
LogFormat: JSON
Resources:
SampleTopic:
Type: AWS::SNS::Topic
PublishTopicFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: hello_world/
Handler: publish.lambda_handler
Runtime: python3.11
Architectures:
- x86_64
Policies:
- arn:aws:iam::aws:policy/AmazonSNSFullAccess
Environment:
Variables:
TOPIC_ARN: !Ref SampleTopic
PublishTopicFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/lambda/${PublishTopicFunction}
SubscribeTopicFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: hello_world/
Handler: subscribe.lambda_handler
Runtime: python3.11
Architectures:
- x86_64
Events:
HelloWorld:
Type: SNS
Properties:
Topic: !Ref SampleTopic
SubscribeTopicFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/lambda/${SubscribeTopicFunction}
Lambdaコード
StrEnumを作る
適当に作ります。
fruit.py
from enum import StrEnum
class Fruit(StrEnum):
APPLE = "apple"
BANANA = "banana"
ORANGE = "orange"
SNSトピックをPublishするLambda
publish.py
import os
import json
import boto3
from fruit import Fruit
client = boto3.client("sns")
def lambda_handler(event, context):
client.publish(
TopicArn=os.environ["TOPIC_ARN"],
Message=json.dumps({"fruit": Fruit.APPLE, "hello": "world"}),
)
SNSトピックをSubscribeするLambda
受け取ったSNSトピックのメッセージをそのまま出力したり、StrEnumと比較ができるかを試してみます。
subscribe.py
import json
from fruit import Fruit
def lambda_handler(event, context):
data = json.loads(event["Records"][0]["Sns"]["Message"])
print(data)
if data["fruit"] == Fruit.APPLE:
print("Apple")
else:
print("Not Apple")
デプロイ
sam deploy \
--guided \
--region ap-northeast-1 \
--stack-name lambda-sns-strenum-sample-stack
動作を確認する
SNSトピックをPublishするLambdaを実行して、とSubscribeするLambdaのログを確認します。
受け取ったSNSトピックのメッセージは、しっかりと文字列になっていました。また、StrEnumとの比較もできました。